Assignment 03

Author
Affiliation

Joshua Lawrence

Boston University

Published

September 21, 2025

Modified

September 23, 2025

import pandas as pd
import plotly.express as px
import plotly.io as pio
from pyspark.sql import SparkSession
import re
import numpy as np
import plotly.graph_objects as go
from pyspark.sql.functions import col, split, explode, regexp_replace, transform, when
from pyspark.sql import functions as F
from pyspark.sql.functions import col, monotonically_increasing_id

np.random.seed(123)

pio.renderers.default = "vscode+svg+jpg"

# Initialize Spark Session
spark = SparkSession.builder.appName("LightcastData").getOrCreate()

# Load Data
df = spark.read.option("header", "true").option("inferSchema", "true").option("multiLine","true").option("escape", "\"").csv("data/lightcast_job_postings.csv")
df.createOrReplaceTempView("job_postings")

# # Show Schema and Sample Data
# print("---This is Diagnostic check, No need to print it in the final doc---")

# df.printSchema() # comment this line when rendering the submission
# df.show(5)
[Stage 8:>                                                          (0 + 1) / 1]                                                                                

1 Data Prep / Cleaning

df = df.withColumn("SALARY_FROM", col("SALARY_FROM").cast("float")) \
.withColumn("SALARY", col("SALARY").cast("float")) \
.withColumn("SALARY_TO", col("SALARY_TO").cast("float")) \
.withColumn("MAX_YEARS_EXPERIENCE", col("MAX_YEARS_EXPERIENCE").cast("float")) \
.withColumn("MIN_YEARS_EXPERIENCE", col("MIN_YEARS_EXPERIENCE").cast("float"))

def compute_median(sdf, col_name):
    q = sdf.approxQuantile(col_name, [0.5], 0.01)
    return q[0] if q else None

median_from = compute_median(df, "SALARY_FROM")
median_to = compute_median(df, "SALARY_TO")
median_salary = compute_median(df, "SALARY")

print("Medians:", median_from, median_to, median_salary)

df = df.fillna({
  "SALARY_FROM": median_from,
  "SALARY_TO": median_to,
  "SALARY": median_salary
})

df = df.withColumn("Average_Salary", (col("SALARY_FROM") + col("SALARY_TO")) / 2)

export_cols = [
  "EDUCATION_LEVELS_NAME",
  "REMOTE_TYPE_NAME",
  "MAX_YEARS_EXPERIENCE",
  "Average_Salary",
  "SALARY",
  "EMPLOYMENT_TYPE_NAME",
  "LOT_V6_SPECIALIZED_OCCUPATION_NAME"
]
df_selected = df.select(*export_cols)

pdf = df_selected.toPandas()
pdf.to_csv("./data/lightcast_cleaned.csv", index=False)

print("Data cleaning complete. Rows retained:", len(pdf))
[Stage 9:>                                                          (0 + 1) / 1]                                                                                [Stage 10:>                                                         (0 + 1) / 1]                                                                                [Stage 11:>                                                         (0 + 1) / 1]                                                                                
Medians: 87295.0 130042.0 115024.0
[Stage 12:>                                                         (0 + 1) / 1]                                                                                
Data cleaning complete. Rows retained: 72498
# salary_df = df.filter(col("SALARY").isNotNull() & (col("SALARY") > 0))
# fig = px.histogram(salary_df.toPandas(), x="SALARY", nbins=50, title="Salary Distribution")
# fig.update_layout(bargap=0.1) 
# fig.show()

Visualize Results -Create a box plot where:

pdf = df_selected.filter(df["SALARY"] > 0).select("EMPLOYMENT_TYPE_NAME", "SALARY").toPandas()
pdf = pdf.dropna()
[Stage 13:>                                                         (0 + 1) / 1]                                                                                
pdf["EMPLOYMENT_TYPE_NAME"] = pdf["EMPLOYMENT_TYPE_NAME"].apply(lambda x: re.sub(r"[^\x00-\x7F]+", "", x))
median_salaries = pdf.groupby("EMPLOYMENT_TYPE_NAME")["SALARY"].median()
median_salaries.head()
EMPLOYMENT_TYPE_NAME
Full-time (> 32 hours)    115024.0
Part-time ( 32 hours)     115024.0
Part-time / full-time     115024.0
Name: SALARY, dtype: float32
sorted_employment_types = median_salaries.sort_values(ascending=False).index

pdf["EMPLOYMENT_TYPE_NAME"] = pd.Categorical(
  pdf["EMPLOYMENT_TYPE_NAME"],
  categories=sorted_employment_types,
  ordered=True
)
fig = px.box(
  pdf,
  x="EMPLOYMENT_TYPE_NAME",
  y="SALARY",
  title="Salary Distribution by Employment Type",
  color_discrete_sequence=["blue"],
  boxmode="group",
  points="all",
)
fig.update_layout(
  title=dict(
    text="Salary Distribution by Employment Type",
    font=dict(size=30, family="Arial", color="black", weight="bold")
  ),
  xaxis=dict(
    title=dict(text="Employment Type", font=dict(size=24, family="Arial", color="black", weight="bold")),
    tickangle=0,
    tickfont=dict(size=18, family="Arial", color="black", weight="bold"),
    showline=True,
    linewidth=2,
    linecolor="black",
    mirror=True,
    showgrid=False,
    categoryorder="array",
    categoryarray=sorted_employment_types.tolist()
  ),
  yaxis=dict(
    title=dict(text="Salary (K $)", font=dict(size=24, family="Arial", color="black", weight="bold")),
    tickvals=[0, 50000, 100000, 150000, 200000, 250000, 300000, 350000, 400000, 450000, 500000],
    ticktext=["0", "50K", "100K", "150K", "200K", "300K", "350K", "400K", "450K", "500K"],
    tickfont=dict(size=18, family="Arial", color="black", weight="bold"),
    showline=True,
    linewidth=2,
    linecolor="black",
    mirror=True,
    showgrid=True,
    gridcolor="lightgray",
    gridwidth=0.5
  ),
  font=dict(family="Arial", size=16, color="black"),
  boxgap=0.7,
  plot_bgcolor="white",
  paper_bgcolor="white",
  showlegend=False,
  height=500,
  width=850
)
fig.show(renderer="notebook")
fig.write_html("output/Q1.html")
print("/output/Q1.html")
/output/Q1.html